实例限流最佳实践

更新时间:

云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,本文介绍云消息队列 RabbitMQ 版实例的限流规则、限流后的行为以及限流最佳实践等。

限流阈值

实例总TPS限流阈值

实例系列

Serverless系列实例

预付费系列实例

规格

预留+弹性/按累积量

未开启弹性TPS

开启弹性TPS

企业版

铂金版

专业版

企业版

铂金版

专业版

限流阈值

最大5万次/秒

基础TPS流量峰值规格

基础TPS流量峰值规格的2倍,最大5万次/秒

基础TPS流量峰值规格的2倍,最大5万次/秒

基础TPS流量峰值规格的1.5

ConnectionTPS限流阈值

ConnectionTPS限流阈值为2.5万次/秒。

单接口的限流阈值

限制项

限制项接口

Serverless系列实例

预付费系列实例

预留+弹性/按累积量

企业版

铂金版

专业版

单实例同步获取消息

basicGet

500 TPS

500 TPS

单实例清Queue

purgeQueue

500 TPS

500 TPS

单实例创建Exchange

exchangeDeclare

500 TPS

500 TPS

单实例删除Exchange

exchangeDelete

500 TPS

500 TPS

单实例创建Queue

queueDeclare

500 TPS

500 TPS

单实例删除Queue

queueDelete

500 TPS

500 TPS

单实例创建Binding

queueBind

500 TPS

500 TPS

单实例删除Binding

queueUnbind

500 TPS

500 TPS

单实例恢复消息

basicRecover

500 TPS

500 TPS

单实例重入Queue消息

  • basicReject(requeue=true)

  • basicNack(requeue=true)

20 TPS

20 TPS

限流规则

云消息队列 RabbitMQ 版实例的TPS流量峰值超过您所购买实例的TPS规格上限时,云消息队列 RabbitMQ 版实例会被限流。

限流后的行为如下:

  • 云消息队列 RabbitMQ 版服务端会返回错误码信息。具体请参见错误码信息

  • 云消息队列 RabbitMQ 版服务端关闭当前请求的Channel,代码中可以捕获异常重新开启Channel,具体请参见错误码处理示例代码

错误码信息

  • 错误码:reply-code=530

  • 错误信息:reply-text=denied for too many requests

Java客户端错误堆栈示例:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>
(reply-code=530, reply-text=denied for too many requests, ReqId:5FB4C999314635F952FCBFF6, ErrorHelp[dstQueue=XXX_test_queue,
srcExchange=Producer.ExchangeName,bindingKey=XXX_test_bk, http://mrw.so/6rNqO8], class-id=50, method-id=20)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599)
    at java.lang.Thread.run(Thread.java:748)

错误码处理示例代码

Java语言为例,代码如下所示:

private static final int MAX_RETRIES = 5; // 最大重试次数
private static final long WAIT_TIME_MS = 2000; // 每次重试的等待时间(以毫秒为单位)

private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
    try {
        // ......
        // 在当前通道channel下执行的任何操作
        // 例如消息发送、消费等
        // ......

    } catch (AlreadyClosedException e) {
        String message = e.getMessage();
        if (isChannelClosed(message)) {
            // 如果通道已经关闭,关闭并重新创建通道
            channel = createChannelWithRetry(connection); 
            // 在重连后可以继续执行其它操作
            // ......
        } else {
            throw e;
        }
    }
}

private Channel createChannelWithRetry(Connection connection) {
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return connection.createChannel();
        } catch (Exception e) {
            System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
            // 检查错误, 若仍是被限流导致的关闭错误,则可以等待后继续重试
            // 也可移除本部分重试逻辑
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(WAIT_TIME_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 还原中断状态
                }
            } else {
                throw new RuntimeException("Exceeded maximum retries to create channel", e);
            }
        }
    }
    throw new RuntimeException("This line should never be reached"); // 理论上不会到达这里
}

private boolean isChannelClosed(String errorMsg) {
    // 判断是否包含channel.close报错,该报错代表通道已关闭。
    // 可能涵盖530,541等错误信息。
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        System.out.println("[ChannelClosed] Error details: " + errorMsg);
        return true;
    }
    return false;
}

实例秒级TPS峰值查询

通过查询实例实际使用的秒级TPS峰值,您可以了解业务的流量波动情况和流量峰值,判断实例规格是否满足业务需求。

云消息队列 RabbitMQ 版提供以下三种方式查询实例的秒级TPS峰值:

查询方式

说明

查询时间级别

查询资源级别

(推荐)通过云监控查询实例TPS峰值并设置告警

优势:

  • 查询结果最大可显示14天内的TPS峰值变化,可快速定位异常范围。

  • 支持将实例TPS峰值作为监控指标设置告警。

  • 支持免费使用。

分钟级TPS峰值

取值为1分钟周期内,每秒钟实例TPS的最大值。

实例级别TPS峰值

(推荐)通过实例详情查询实例TPS峰值

  • 优势:

    • 支持查询秒级TPS峰值,可精确异常范围。

    • 支持查看具体API接口的TPS峰值。

    • 支持免费使用。

  • 不足:为避免显示结果过多,只显示10分钟内的查询结果。

秒级TPS峰值

  • 实例级别TPS峰值

  • 实例内某个API接口的TPS峰值

通过日志查询实例TPS峰值

  • 优势:支持通过SLS分析语句查询,适合复杂问题定位场景。

  • 不足:

    • 相较于前两种查询方式,操作较复杂,查询结果不够直观。

    • 需要额外支付日志服务相关费用,具体计费信息,请参见日志服务计费项

秒级TPS峰值

实例级别TPS峰值

TPS被限流后怎么处理?

如果出现因TPS峰值设置不合理导致实例、Connection被限流,从而影响到您的业务,建议您按照以下解决办法处理。

单实例总TPS被限流的解决办法

  • 如果在测试或者流量峰值不确定、流量较少的短期场景,建议您为预付费系列实例开启弹性TPS能力或者使用Serverless系列实例。更多信息,请参见为实例开启弹性TPS功能

  • 如果在长期稳定、流量较高的业务运行场景,建议您升级TPS流量峰值规格。更多信息,请参见升级实例配置

ConnectionTPS被限流的解决办法

  • 云消息队列 RabbitMQ 版采用分布式集群架构。建议为每个队列创建多个连接(至少10个),以便客户端能够更均衡地连接到集群中的多个服务节点。这种方法可以有效避免出现负载热点问题,从而提高消息的发送和消费效率。

  • Spring用户推荐使用CachingConnectionFactoryCONNECTION模式,详情请参见Spring集成